Fix deferrable BeamRunPythonPipelineOperator fails with 400 when job-id absent from launcher stdout#68720
Conversation
MaksYermak
left a comment
There was a problem hiding this comment.
The current PR is a copy of this #67711 and that PR was closed because this solution is not solve the problem which is described in issue.
The issue is not in None value for DataflowJobID. It is bug in deferrable mode itself. It was well described in the issue by this paragraph:
- Without the id, there is no real async execution at all. The launcher's stdout loop only short-circuits once the id is seen; with no id it blocks until the subprocess exits — i.e. it runs the entire Dataflow job synchronously on the worker. By the time the operator reaches the deferrable branch the job has already finished, so deferring buys nothing (no worker slot is freed during the job) and then fails on resume because it defers with job_id=None. So the deferrable feature is not merely buggy in this case — it is structurally unable to defer until after the job id is captured, which is exactly what's missing.
…auncher stdout When a DataflowRunner pipeline does not configure INFO-level logging the Beam SDK's 'Created job with id: [...]' line is suppressed, so JOB_ID_PATTERN never matches and self.dataflow_job_id stays None. The synchronous wait_for_done() path already handles this by resolving the job by name prefix; the deferrable path did not, so it deferred with job_id=None and the Dataflow API immediately rejected the trigger with '400 Request must contain a job and project id.' Fix: before constructing the trigger in execute_on_dataflow() for both BeamRunPythonPipelineOperator and BeamRunJavaPipelineOperator, if dataflow_job_id is still None call a new DataflowHook.get_job_id_by_name() helper that looks up the job by name prefix via the Dataflow REST API. This mirrors the existing synchronous fallback and ensures the trigger always receives a valid ID. Closes apache#68279
904fec4 to
f811723
Compare
|
Thanks for the review and for pointing this out clearly. After digging into it more, I agree I did not target the actual issue correctly. My changes were focused on avoiding the immediate job_id=None failure, but they do not address the more important problem you described: in this case the deferrable path is not truly deferring early, because it still depends on when the launcher process yields enough information. So I’m going to close this PR rather than keep pushing an incomplete direction. If I come back to this, I’ll start from the actual deferrable execution semantics and propose the approach in the issue first before sending another PR. Thanks again for the correction. |
Closes #68279
Problem
A
BeamRunPythonPipelineOperator(or Java variant) withdeferrable=Trueandrunner="DataflowRunner"raises:whenever the Beam launcher subprocess stdout does not contain the
Created job with id: [...]line. This happens routinely when the pipeline does not configure INFO-level logging, since the Beam SDK emits that line at INFO while Python's root logger defaults to WARNING.The synchronous path (
deferrable=False) already handles this:DataflowHook.wait_for_done()falls back to resolving the job by name prefix whenjob_id=None. The deferrable path had no such fallback — it passedjob_id=Nonedirectly into the trigger, which the Dataflow API immediately rejected.Fix
Before building the trigger in
execute_on_dataflow(), ifself.dataflow_job_idis stillNoneafter the launcher finishes, call a newDataflowHook.get_job_id_by_name()helper that resolves the ID via the Dataflow REST API by name prefix. This mirrors the existing synchronous fallback so both paths behave consistently.The fix applies to both
BeamRunPythonPipelineOperatorandBeamRunJavaPipelineOperator.Changes
providers/google/.../hooks/dataflow.py: addDataflowHook.get_job_id_by_name()— looks up the most recently submitted job whose name starts with the given prefix and returns its ID.providers/apache/beam/.../operators/beam.py: inexecute_on_dataflow()for both Python and Java operators, call the new helper whendataflow_job_id is Nonebefore deferring.providers/apache/beam/tests/.../test_beam.py: addtest_exec_dataflow_runner_defers_with_resolved_job_id_when_stdout_missingfor both operators, assertingget_job_id_by_nameis called and the trigger carries the resolved ID.